package com.xiaohu.transfrom.streamtranform;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/*
    值得一提的是, ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的connectedStreams.
keyBy (keySelector1, keySelector2);还是一个ConnectedStreams:这里传入两个参数keySelectorl和keySelector2,
是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。
ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起, 然后针对来源的流再做各自处理,这在一些场景下非常有用。

场景：两个流中有着相同的连接键，使用connect连接后，按照指定的key进行连接，类似于sql中join中的on条件

 */
public class ConnectDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );

        DataStreamSource<Tuple3<Integer, String,Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1",1),
                Tuple3.of(1, "aa2",2),
                Tuple3.of(2, "bb",1),
                Tuple3.of(3, "cc",1)
        );

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> cs = source1.connect(source2);
        //TODO:注意!!! 当并行度大于1的时候，相同的key可能会进入到不同的分区中，这样就无法找到彼此
        //为了在多并行度的情况下，还可以进行连接匹配，就需要进行keyBy让相同的key进入到同一个分区
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> keyedCs = cs.keyBy(e -> e.f0, e2 -> e2.f0);

        /*
            我们要实现互相匹配的效果
            1、两条流不知道那一条数据先来
            2、每条流，有数据来，就先存到一个变量中【实际开发中是存放在状态中】
            3、每条流有数据来的时候，除了存变量中，不知道对方是否有匹配的数据，要去另一条流存到变量查找是否有匹配上的
         */
        SingleOutputStreamOperator<String> ds = keyedCs.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
            Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
            Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

            /**
             * 第一条流的处理逻辑
             * @param value 第一条流的数据
             * @param ctx 上下文
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                // s1的数据来了就存到变量中
                Integer id = value.f0;
                if (!s1Cache.containsKey(id)) {
                    ArrayList<Tuple2<Integer, String>> tuple2s = new ArrayList<>();
                    tuple2s.add(value);
                    s1Cache.put(id, tuple2s);
                } else {
                    s1Cache.get(id).add(value);
                }

                if (s2Cache.containsKey(id)) {
                    for (Tuple3<Integer, String, Integer> tuple3 : s2Cache.get(id)) {
                        out.collect("s1:" + value + "<==================>s2: " + tuple3);
                    }
                }
            }

            /**
             * 第二条流的处理逻辑
             * @param value 第二条流的数据
             * @param ctx 上下文
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                // s1的数据来了就存到变量中
                Integer id = value.f0;
                if (!s2Cache.containsKey(id)) {
                    ArrayList<Tuple3<Integer, String, Integer>> tuple3s = new ArrayList<>();
                    tuple3s.add(value);
                    s2Cache.put(id, tuple3s);
                } else {
                    s2Cache.get(id).add(value);
                }

                if (s1Cache.containsKey(id)) {
                    for (Tuple2<Integer, String> tuple2 : s1Cache.get(id)) {
                        out.collect("s1: " + tuple2 + "<==================>s2: " + value);
                    }
                }
            }
        });
        ds.print();


        env.execute();
    }
}
